{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Get `AVGStarRating` from Kinesis Data Stream"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "import sagemaker\n",
    "import pandas as pd\n",
    "\n",
    "sess   = sagemaker.Session()\n",
    "bucket = sess.default_bucket()\n",
    "role = sagemaker.get_execution_role()\n",
    "region = boto3.Session().region_name\n",
    "\n",
    "sm = boto3.Session().client(service_name='sagemaker', region_name=region)\n",
    "kinesis = boto3.Session().client(service_name='kinesis', region_name=region)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r stream_name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(stream_name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# TODO:  Adapt to any number of shards\n",
    "print(kinesis.list_shards(StreamName=stream_name)['Shards'])\n",
    "\n",
    "shard_id_1 = kinesis.list_shards(StreamName=stream_name)['Shards'][0]['ShardId']\n",
    "print(shard_id_1)\n",
    "\n",
    "shard_id_2 = kinesis.list_shards(StreamName=stream_name)['Shards'][1]['ShardId']\n",
    "print(shard_id_2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Get Records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "shard_iter_1 = kinesis.get_shard_iterator(StreamName=stream_name, \n",
    "                                          ShardId=shard_id_1, \n",
    "                                          ShardIteratorType='TRIM_HORIZON')['ShardIterator']\n",
    "\n",
    "shard_iter_2 = kinesis.get_shard_iterator(StreamName=stream_name, \n",
    "                                          ShardId=shard_id_2, \n",
    "                                          ShardIteratorType='TRIM_HORIZON')['ShardIterator']"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "records_response_1 = kinesis.get_records(\n",
    "    ShardIterator=shard_iter_1,\n",
    "    Limit=10\n",
    ")\n",
    "\n",
    "print(records_response_1)\n",
    "\n",
    "if records_response_1['Records']:\n",
    "    print(records_response_1['Records'][0]['Data'].decode('utf-8'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "records_response_2 = kinesis.get_records(\n",
    "    ShardIterator=shard_iter_2,\n",
    "    Limit=10\n",
    ")\n",
    "\n",
    "print(records_response_2)\n",
    "\n",
    "if records_response_2['Records']:\n",
    "    print(records_response_2['Records'][0]['Data'].decode('utf-8'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(HTML('<b>Review <a target=\"blank\" href=\"https://console.aws.amazon.com/kinesis/home?region={}#/streams/details/{}/monitoring\"> Stream</a></b>'.format(region, stream_name)))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%javascript\n",
    "Jupyter.notebook.save_checkpoint();\n",
    "Jupyter.notebook.session.delete();"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
